//package org.codehaus.activemq.transport.jgroups;
//
//import EDU.oswego.cs.dl.util.concurrent.BoundedBuffer;
//import EDU.oswego.cs.dl.util.concurrent.Executor;
//import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
//import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
//import java.io.IOException;
//import javax.jms.JMSException;
//import org.apache.commons.logging.Log;
//import org.apache.commons.logging.LogFactory;
//import org.codehaus.activemq.message.Packet;
//import org.codehaus.activemq.message.WireFormat;
//import org.codehaus.activemq.transport.AbstractTransportChannel;
//import org.codehaus.activemq.util.JMSExceptionHelper;
//import org.jgroups.Address;
//import org.jgroups.Channel;
//import org.jgroups.ChannelClosedException;
//import org.jgroups.ChannelException;
//import org.jgroups.ChannelNotConnectedException;
//import org.jgroups.Message;
//import org.jgroups.TimeoutException;
//
//public class JGroupsTransportChannel extends AbstractTransportChannel
//  implements Runnable
//{
//  private static final Log log = LogFactory.getLog(JGroupsTransportChannel.class);
//  private Channel channel;
//  private Address localAddress = null;
//  private WireFormat wireFormat;
//  private SynchronizedBoolean closed;
//  private SynchronizedBoolean started;
//  private Object outboundLock;
//  private Executor executor;
//  private Thread thread;
//  private boolean useAsyncSend = false;
//
//  public JGroupsTransportChannel(WireFormat wireFormat, Channel channel, Executor executor) {
//    this.wireFormat = wireFormat;
//    this.channel = channel;
//    this.executor = executor;
//    this.localAddress = channel.getLocalAddress();
//
//    this.closed = new SynchronizedBoolean(false);
//    this.started = new SynchronizedBoolean(false);
//    this.outboundLock = new Object();
//    if (this.useAsyncSend)
//      executor = new PooledExecutor(new BoundedBuffer(1000), 1);
//  }
//
//  public String toString()
//  {
//    return "JGroupsTransportChannel: " + this.channel;
//  }
//
//  public void stop()
//  {
//    if (this.closed.commit(false, true)) {
//      super.stop();
//      try {
//        stopExecutor(this.executor);
//        this.channel.disconnect();
//        this.channel.close();
//      }
//      catch (Exception e) {
//        log.warn("Caught while closing: " + e + ". Now Closed", e);
//      }
//    }
//  }
//
//  public void start()
//    throws JMSException
//  {
//    if (this.started.commit(false, true)) {
//      this.thread = new Thread(this, "Thread:" + toString());
//      this.thread.setDaemon(true);
//      this.thread.start();
//    }
//  }
//
//  public void asyncSend(Packet packet)
//    throws JMSException
//  {
//    if (this.executor != null) {
//      try {
//        this.executor.execute(new Runnable(packet) { private final Packet val$packet;
//
//          public void run() { try { JGroupsTransportChannel.this.writePacket(this.val$packet);
//            } catch (JMSException e)
//            {
//              JGroupsTransportChannel.this.onAsyncException(e);
//            } } } );
//      }
//      catch (InterruptedException e)
//      {
//        log.info("Caught: " + e, e);
//      }
//    }
//    else
//      writePacket(packet);
//  }
//
//  public boolean isMulticast()
//  {
//    return true;
//  }
//
//  public void run()
//  {
//    log.trace("JGroups consumer thread starting");
//    while (!this.closed.get())
//      try {
//        Object value = this.channel.receive(0L);
//        if ((value instanceof Message)) {
//          Message message = (Message)value;
//
//          if (!this.localAddress.equals(message.getSrc())) {
//            byte[] data = message.getBuffer();
//            Packet packet = this.wireFormat.fromBytes(data);
//            if (packet != null) {
//              doConsumePacket(packet);
//            }
//
//          }
//
//        }
//
//      }
//      catch (IOException e)
//      {
//        doClose(e);
//      }
//      catch (ChannelClosedException e) {
//        stop();
//      }
//      catch (ChannelNotConnectedException e) {
//        doClose(e);
//      }
//      catch (TimeoutException e)
//      {
//      }
//  }
//
//  protected void writePacket(Packet packet)
//    throws JMSException
//  {
//    try
//    {
//      synchronized (this.outboundLock) {
//        Address dest = null;
//        Message message = new Message(dest, this.localAddress, this.wireFormat.toBytes(packet));
//        this.channel.send(message);
//      }
//    }
//    catch (ChannelException e) {
//      throw JMSExceptionHelper.newJMSException("writePacket failed: " + e, e);
//    }
//    catch (IOException e) {
//      throw JMSExceptionHelper.newJMSException("writePacket failed: " + e, e);
//    }
//  }
//
//  private void doClose(Exception ex)
//  {
//    if (!this.closed.get()) {
//      onAsyncException(JMSExceptionHelper.newJMSException("Error reading socket: " + ex, ex));
//      stop();
//    }
//  }
//}